-
Notifications
You must be signed in to change notification settings - Fork 0
Clone kafka 18894 #41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughSwitches config provider handling from direct instances to Plugin-wrapped providers across core clients and Connect runtime. Updates constructors, maps, and close logic to operate on Plugin. Adds Monitorable-capable test provider and metrics wiring/tests. Minor Javadoc and formatting updates. Tests adjusted to use Plugin wrappers and Map.of. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Admin as Admin/Worker.init
participant Plugins as Plugins
participant Plugin as Plugin<ConfigProvider>
participant Provider as ConfigProvider
participant WCT as WorkerConfigTransformer
participant CT as ConfigTransformer
participant Metrics as Metrics
Admin->>Plugins: newConfigProvider(config, providerName, classLoaderUsage, Metrics)
Plugins-->>Admin: Plugin.wrapInstance(provider, metrics, tags)
Admin->>WCT: new WorkerConfigTransformer(worker, Map{name -> Plugin})
WCT->>CT: new ConfigTransformer(Map{name -> Plugin})
note over CT: During transform
CT->>Plugin: get()
Plugin-->>CT: ConfigProvider
CT->>Provider: get(path/keys)
Provider-->>CT: ConfigData
CT-->>WCT: Transformed configs
note over Plugins,Provider: If provider implements Monitorable
Plugins->>Provider: withPluginMetrics(PluginMetrics)
Provider-->>Metrics: register gauges/counters
sequenceDiagram
autonumber
actor MM as MirrorMakerConfig.transform
participant Plugins as Plugins
participant Plugin as Plugin<ConfigProvider>
participant CT as ConfigTransformer
MM->>Plugins: newConfigProvider(config, providerName, usage, Metrics)
Plugins-->>MM: Plugin<ConfigProvider>
MM->>CT: new ConfigTransformer(Map{name -> Plugin})
MM->>CT: transform(configs)
CT-->>MM: Transformed configs
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (3 warnings)
✨ Finishing touches
🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ast-grep (0.39.5)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.javaThanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @DDShantanuBadmanji, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a fundamental refactoring to how Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Refacto is reviewing this PR. Please wait for the review comments to be posted. |
🔒 Limited Access – You're on the free plan.🚀 Upgrade to Pro and unlock:
---## Clone kafka 18894: Add metrics support for ConfigProvider plugins TL;DR: Enhances ConfigProvider with metrics support through the Plugin wrapper, enabling monitoring capabilities for config providers. Refacto PR SummaryAdds metrics support to ConfigProvider plugins via the Plugin wrapper class and Monitorable interface integration. Change HighlightsClick to expand
Sequence DiagramsequenceDiagram
participant Worker
participant Plugins
participant ConfigProvider
participant Plugin
participant Metrics
Worker->>Plugins: newConfigProvider(config, providerName, ClassLoaderUsage, metrics)
Plugins->>Plugins: Load and configure ConfigProvider
Plugins->>Plugin: wrapInstance(provider, metrics, CONFIG_PROVIDERS_CONFIG, tags)
Plugin->>ConfigProvider: Check if implements Monitorable
ConfigProvider->>Metrics: Register metrics (if Monitorable)
Plugins-->>Worker: Return Plugin<ConfigProvider>
Worker->>Worker: Create ConfigTransformer with Plugin-wrapped providers
Testing GuideClick to expand
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a mechanism to expose metrics from ConfigProvider implementations by wrapping them in a Plugin object. The changes are consistently applied across the client and connect modules, enabling metrics for config providers in Kafka Connect. The core logic is sound, and tests are updated and added to cover the new functionality. I have one suggestion to improve the consistency of metric tagging for config providers used outside of the Kafka Connect runtime, which will also align the implementation with the updated documentation.
| ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class); | ||
| provider.configure(configProperties); | ||
| configProviderInstances.put(entry.getKey(), provider); | ||
| Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To ensure consistent metric tagging for ConfigProviders across different use cases (client-side vs. Kafka Connect), it would be beneficial to also add the provider tag here. This would align with the behavior in connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java and make the updated Javadoc in ConfigProvider.java accurate for all scenarios. The provider name is available here as entry.getKey().
| Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG); | |
| Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG, Map.of("provider", entry.getKey())); |
|
/refacto-test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (1)
270-286: Close provider plugins in a finally block.If
transformer.transform(props)throws, the current code never reaches theforEachthat closes thePlugin<ConfigProvider>instances, leaking both the provider and any attached plugin metrics. Wrapping the transform call in atry/finallyrestores the previous cleanup guarantees and matches our usual pattern for plugin lifecycle management.- ConfigTransformer transformer = new ConfigTransformer(providerPlugins); - Map<String, String> transformed = transformer.transform(props).data(); - providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); + ConfigTransformer transformer = new ConfigTransformer(providerPlugins); + try { + Map<String, String> transformed = transformer.transform(props).data(); + return transformed; + } finally { + providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); + } - return transformed; + return null; // replace with the real return in the try block(Adjust the placement so the actual return happens inside the
trywhile keeping thefinallysolely for cleanup.)clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (1)
532-560: Ensure config provider plugins are always closed.
ConfigTransformer.transform(indirectVariables)can throw (bad provider output, parse errors, etc.), and when it does we currently leak everyPlugin<ConfigProvider>created in this method. This regression matters now that plugins may allocate metric resources. Wrapping the transform call in atry/finallykeeps the lifecycle symmetrical:- if (!providerPlugins.isEmpty()) { - ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins); - ConfigTransformerResult result = configTransformer.transform(indirectVariables); - if (!result.data().isEmpty()) { - resolvedOriginals.putAll(result.data()); - } - } - providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); + if (!providerPlugins.isEmpty()) { + ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins); + try { + ConfigTransformerResult result = configTransformer.transform(indirectVariables); + if (!result.data().isEmpty()) { + resolvedOriginals.putAll(result.data()); + } + } finally { + providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); + } + }This mirrors our other plugin-wrapping sites and prevents resource leakage on failure.
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
65-69: Refresh the constructor Javadoc.The parameter doc still refers to raw
ConfigProviderinstances, but the signature now takesPlugin<ConfigProvider>. Please update the wording so the documentation matches the API and avoids confusion.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
Vagrantfile(1 hunks)clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java(4 hunks)clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java(3 hunks)clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java(1 hunks)clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java(5 hunks)clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java(1 hunks)connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java(2 hunks)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java(1 hunks)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java(3 hunks)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java(3 hunks)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java(5 hunks)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java(5 hunks)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (10)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala (1)
withPluginMetrics(6299-6301)
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (2)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
MonitorableConfigProvider(29-58)clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (3)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
Metrics(70-696)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java (1)
WorkerConfig(58-463)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
Utils(93-1700)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (3)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
MonitorableConfigProvider(29-58)clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
Metrics(70-696)
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (1)
Plugins(59-696)
| public ConfigData get(String path, Set<String> keys) { | ||
| if (path.equals(TEST_PATH)) { | ||
| if (keys.contains(TEST_KEY)) { | ||
| return new ConfigData(Collections.singletonMap(TEST_KEY, TEST_RESULT)); | ||
| return new ConfigData(Map.of(TEST_KEY, TEST_RESULT)); | ||
| } else if (keys.contains(TEST_KEY_WITH_TTL)) { | ||
| return new ConfigData(Collections.singletonMap(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 1L); | ||
| return new ConfigData(Map.of(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 1L); | ||
| } else if (keys.contains(TEST_KEY_WITH_LONGER_TTL)) { | ||
| return new ConfigData(Collections.singletonMap(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL), 10L); | ||
| return new ConfigData(Map.of(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL), 10L); | ||
| } | ||
| } | ||
| return new ConfigData(Collections.emptyMap()); | ||
| return new ConfigData(Map.of()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Restore support for multi-key lookups in the test provider.
ConfigTransformer aggregates all keys per provider/path into a single Set and calls ConfigProvider#get(path, keys) once. The new implementation returns immediately on the first match, so if two keys share the same path (e.g., ${test:testPath:testKey} and ${test:testPath:testKeyWithTTL} in the same config), only one value is supplied and the other substitution silently fails. The previous version accumulated all matches in a map, so the regression drops coverage for the core multi-key behavior we rely on in production.
Please revert to building the result map before returning, so every requested key is populated and TTL is derived consistently, e.g.:
- if (path.equals(TEST_PATH)) {
- if (keys.contains(TEST_KEY)) {
- return new ConfigData(Map.of(TEST_KEY, TEST_RESULT));
- } else if (keys.contains(TEST_KEY_WITH_TTL)) {
- return new ConfigData(Map.of(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 1L);
- } else if (keys.contains(TEST_KEY_WITH_LONGER_TTL)) {
- return new ConfigData(Map.of(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL), 10L);
- }
- }
- return new ConfigData(Map.of());
+ Map<String, String> data = new HashMap<>();
+ Long ttl = null;
+ if (path.equals(TEST_PATH)) {
+ if (keys.contains(TEST_KEY)) {
+ data.put(TEST_KEY, TEST_RESULT);
+ }
+ if (keys.contains(TEST_KEY_WITH_TTL)) {
+ data.put(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL);
+ ttl = 1L;
+ }
+ if (keys.contains(TEST_KEY_WITH_LONGER_TTL)) {
+ data.put(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL);
+ ttl = 10L;
+ }
+ }
+ return new ConfigData(data, ttl);This keeps the stub faithful to the contract and ensures the worker transformer tests continue to cover multi-key resolutions.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| public ConfigData get(String path, Set<String> keys) { | |
| if (path.equals(TEST_PATH)) { | |
| if (keys.contains(TEST_KEY)) { | |
| return new ConfigData(Collections.singletonMap(TEST_KEY, TEST_RESULT)); | |
| return new ConfigData(Map.of(TEST_KEY, TEST_RESULT)); | |
| } else if (keys.contains(TEST_KEY_WITH_TTL)) { | |
| return new ConfigData(Collections.singletonMap(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 1L); | |
| return new ConfigData(Map.of(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 1L); | |
| } else if (keys.contains(TEST_KEY_WITH_LONGER_TTL)) { | |
| return new ConfigData(Collections.singletonMap(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL), 10L); | |
| return new ConfigData(Map.of(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL), 10L); | |
| } | |
| } | |
| return new ConfigData(Collections.emptyMap()); | |
| return new ConfigData(Map.of()); | |
| } | |
| public ConfigData get(String path, Set<String> keys) { | |
| Map<String, String> data = new HashMap<>(); | |
| Long ttl = null; | |
| if (path.equals(TEST_PATH)) { | |
| if (keys.contains(TEST_KEY)) { | |
| data.put(TEST_KEY, TEST_RESULT); | |
| } | |
| if (keys.contains(TEST_KEY_WITH_TTL)) { | |
| data.put(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL); | |
| ttl = 1L; | |
| } | |
| if (keys.contains(TEST_KEY_WITH_LONGER_TTL)) { | |
| data.put(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL); | |
| ttl = 10L; | |
| } | |
| } | |
| return new ConfigData(data, ttl); | |
| } |
🤖 Prompt for AI Agents
In
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
around lines 147-158, the test ConfigProvider#get implementation currently
returns on the first matching key and thus fails multi-key lookups; change it to
build a result map for all requested keys for the TEST_PATH by iterating the
provided keys set, adding each matching key/value pair to the map and collecting
any TTLs, then return a single ConfigData containing the full map and a
consistently derived TTL (e.g., compute and pass the appropriate TTL after
collecting all matches) so every requested key is populated instead of returning
early.
|
/refacto-bot |
Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.
If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.
If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.
Summary by CodeRabbit
New Features
Documentation
Refactor
Tests